package rocketmq

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"go.uber.org/zap"
	"niu-ren/component/ck_log"
	"niu-ren/utils/common"
	"strings"
	"sync"
	"time"
)

// CKConsumer ...
type CKConsumer struct {
	ctx        context.Context
	Host       string
	Topic      string
	Group      string
	UseTag     string
	Consumer   rocketmq.PushConsumer
	Selector   consumer.MessageSelector
	wg         sync.WaitGroup
	Subscribed bool
	Started    bool
}

var mux sync.Mutex
var cc = 1
var preStr = ""

// NewCKConsumer ...
func NewCKConsumer(ctx context.Context, host, topic, group, useTag string) *CKConsumer {
	mux.Lock()
	defer mux.Unlock()
	cc++
	if preStr == "" {
		preStr = common.GetRandomString(6)
	}

	hosts := strings.Split(host, ";")
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(group),
		consumer.WithNsResolver(primitive.NewPassthroughResolver(hosts)),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
		consumer.WithRetry(3),
		consumer.WithInstance(fmt.Sprintf("%s-%d", preStr, cc)),
	)
	selector := consumer.MessageSelector{
		Type:       consumer.TAG,
		Expression: useTag,
	}
	return &CKConsumer{
		ctx:        ctx,
		Host:       host,
		Topic:      topic,
		Group:      group,
		UseTag:     useTag,
		Consumer:   c,
		Selector:   selector,
		wg:         sync.WaitGroup{},
		Subscribed: false,
		Started:    false,
	}
}

// Subscribe 订阅后要阻断进程中断, 不然goroutin会被中断。
func (c *CKConsumer) Subscribe(fun func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error {
	err := c.Consumer.Subscribe(c.Topic, c.Selector, fun)
	if err != nil {
		return fmt.Errorf("[RocketMQ消费] 消费订阅异常 %s", err)
	}
	c.Subscribed = true
	err = c.Consumer.Start()
	if err != nil {
		return fmt.Errorf("[RocketMQ消费] 启动订阅异常 %s", err)
	}
	ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] 启动订阅成功")

	c.Started = true
	c.wg.Add(1)
	go c.wg.Wait()
	ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] 启动wg wait成功")

	return nil
}

func (c *CKConsumer) Stop(topic string) {
	ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] 进入stop函数")
	if c.Started {
		c.wg.Done()
		c.Started = false
		ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] wg wait done 成功")
	}
	if c.Subscribed {
		c.Unsubscribe(topic)
		ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] Unsubscribe 成功")
	}
	time.Sleep(time.Millisecond * 500)
}

func (c *CKConsumer) Unsubscribe(topic string) {
	e := c.Consumer.Unsubscribe(topic)
	c.Subscribed = false
	ck_log.LogCtx(c.ctx).Warnw("[RocketMQ消费] Unsubscribe", zap.Error(e))
}

// Shutdown 队列关闭
func (c *CKConsumer) Shutdown() {
	ck_log.LogCtx(c.ctx).Info("[RocketMQ消费] Shutdown; Group: %s", c.Group)
	_ = c.Consumer.Shutdown()
}
